-
Notifications
You must be signed in to change notification settings - Fork 508
feat: Add RedisStorageClient based on Redis v8.0+
#1406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
RedisStorageClientRedisStorageClient based on Redis v8.0+
|
Performance test. 1 client Code to run import asyncio
from crawlee import ConcurrencySettings
from crawlee.crawlers import ParselCrawler, ParselCrawlingContext
from crawlee.http_clients import HttpxHttpClient
from crawlee.storage_clients import RedisStorageClient
CONNECTION = 'redis://localhost:6379'
async def main() -> None:
storage_client = RedisStorageClient(connection_string=CONNECTION)
http_client = HttpxHttpClient()
crawler = ParselCrawler(
storage_client=storage_client,
http_client=http_client,
concurrency_settings=ConcurrencySettings(desired_concurrency=20),
)
@crawler.router.default_handler
async def request_handler(context: ParselCrawlingContext) -> None:
context.log.info(f'Processing URL: {context.request.url}...')
data = {
'url': context.request.url,
'title': context.selector.css('title::text').get(),
}
await context.push_data(data)
await context.enqueue_links()
await crawler.run(['https://crawlee.dev'])
if __name__ == '__main__':
asyncio.run(main())[ParselCrawler] INFO Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished │ 2363 │
│ requests_failed │ 0 │
│ retry_histogram │ [2363] │
│ request_avg_failed_duration │ None │
│ request_avg_finished_duration │ 358.4ms │
│ requests_finished_per_minute │ 3545 │
│ requests_failed_per_minute │ 0 │
│ request_total_duration │ 14min 6.8s │
│ requests_total │ 2363 │
│ crawler_runtime │ 39.99s │
└───────────────────────────────┴────────────┘3 clients Code to run import asyncio
from concurrent.futures import ProcessPoolExecutor
from crawlee import ConcurrencySettings, service_locator
from crawlee.crawlers import ParselCrawler, ParselCrawlingContext
from crawlee.http_clients import HttpxHttpClient
from crawlee.storage_clients import RedisStorageClient
from crawlee.storages import RequestQueue
CONNECTION = 'redis://localhost:6379'
async def run(queue_name: str) -> None:
storage_client = RedisStorageClient(connection_string=CONNECTION)
service_locator.set_storage_client(storage_client)
queue = await RequestQueue.open(name=queue_name)
http_client = HttpxHttpClient()
crawler = ParselCrawler(
http_client=http_client,
request_manager=queue,
concurrency_settings=ConcurrencySettings(desired_concurrency=20),
)
@crawler.router.default_handler
async def request_handler(context: ParselCrawlingContext) -> None:
context.log.info(f'Processing URL: {context.request.url}...')
data = {
'url': context.request.url,
'title': context.selector.css('title::text').get(),
}
await context.push_data(data)
await context.enqueue_links()
await crawler.run(['https://crawlee.dev'])
def process_run(queue_name: str) -> None:
asyncio.run(run(queue_name))
def multi_run(queue_name: str = 'multi') -> None:
workers = 3
with ProcessPoolExecutor(max_workers=workers) as executor:
executor.map(process_run, [queue_name for i in range(workers)])
if __name__ == '__main__':
multi_run()[ParselCrawler] INFO Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished │ 779 │
│ requests_failed │ 0 │
│ retry_histogram │ [779] │
│ request_avg_failed_duration │ None │
│ request_avg_finished_duration │ 356.9ms │
│ requests_finished_per_minute │ 2996 │
│ requests_failed_per_minute │ 0 │
│ request_total_duration │ 4min 38.0s │
│ requests_total │ 779 │
│ crawler_runtime │ 15.60s │
└───────────────────────────────┴────────────┘
[ParselCrawler] INFO Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished │ 762 │
│ requests_failed │ 0 │
│ retry_histogram │ [762] │
│ request_avg_failed_duration │ None │
│ request_avg_finished_duration │ 360.0ms │
│ requests_finished_per_minute │ 2931 │
│ requests_failed_per_minute │ 0 │
│ request_total_duration │ 4min 34.3s │
│ requests_total │ 762 │
│ crawler_runtime │ 15.60s │
└───────────────────────────────┴────────────┘
[ParselCrawler] INFO Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished │ 822 │
│ requests_failed │ 0 │
│ retry_histogram │ [822] │
│ request_avg_failed_duration │ None │
│ request_avg_finished_duration │ 342.2ms │
│ requests_finished_per_minute │ 3161 │
│ requests_failed_per_minute │ 0 │
│ request_total_duration │ 4min 41.3s │
│ requests_total │ 822 │
│ crawler_runtime │ 15.60s │
└───────────────────────────────┴────────────┘ |
|
In Since a Bloom filter is a probabilistic data structure, the final data structure size is affected by the error probability; I used Memory consumption for records in the format 'https://crawlee.dev/{i}' (record size doesn't matter for Bloom filters): Redis Bloom filter:
Redis set:
Discussion about whether it's worth pursuing this approach is welcome! |
|
I haven't read the PR yet, but I did look into bloom filters for request deduplication in the past and what you wrote piqued my interest 🙂 I am a little worried about the chance of dropping a URL completely, even with a super small probability. Perhaps we should default to a solution that tolerates some percentage of the "opposite" errors and allows a URL to get processed multiple times in rare cases. A fixed size hash table is an example of such data structure. I don't know if anything more sophisticated exists. But maybe I have an irrational fear of probabilistic stuff 🙂 |
Yes, I agree that this may be a little disturbing. And if we go down this route, it will need to be highlighted separately for the user. But perhaps I am not sufficiently afraid of probabilistic structures, as I have used them before. 🙂 |
|
Since we have already added the ability to parameterize queue behavior in the SDK ( |
|
It also works with:
|
Thanks for checking that out! It's a real surprise to me that the Redis client is fully compatible with these. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some small comments and questions. I am really looking forward to using this. Good work.
| data = [data] | ||
|
|
||
| async with self._get_pipeline() as pipe: | ||
| # Incorrect signature for args type in redis-py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an existing issue about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can expect this to be fixed in the next release.
redis/redis-py#3780
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be part of the v7 release done 2 days ago. Maybe bump the version to redis = ["redis[hiredis] >= 7.0.0"] and if ok, remove the # type: ignore[arg-type]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should reach out to somebody who knows Redis better to review this for us. Or do you feel confident @Pijukatel? 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should mention that Redis persistence is unlike that of filesystem or SQL storage and link to https://redis.io/docs/latest/operate/oss_and_stack/management/persistence/
| return await response if isinstance(response, Awaitable) else response | ||
|
|
||
|
|
||
| def read_lua_script(file_path: Path) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this accept just the file name and prepend the path to the lua_scripts directory automatically?
|
|
||
| # Call the notification only once | ||
| warnings.warn( | ||
| 'The RedisStorageClient is experimental and may change or be removed in future releases.', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we'll want to remove it 🙂 The storage "schema" could change, though - perhaps we should mention that.
| Returns: | ||
| An instance for the opened or created storage client. | ||
| """ | ||
| internal_name = name or alias or cls._DEFAULT_NAME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this - does it mean that there is no difference in behavior of named and aliased storages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for alias in the metadata object name=None.
internal_name is used to form the key prefix. As in FileSystemStorageClient, the folder name is formed.
Co-authored-by: Jan Buchar <[email protected]>
Feel free to invite someone more experienced with Redis to review; I was mainly focusing on the Python part in the review. Anyway, I tried running it a little and did not see anything wrong. It is also optional and experimental, so I would not be afraid to release it and improve it as we go. No existing user should be affected by this, even if there is some hidden bug. |
Description
This PR implements a storage client
RedisStorageClientbased on Redis v8+. The minimum version 8 requirement is due to the fact that all data structures used are only available starting from Redis Open-Source version 8, without any additional extensions.Testing
fakeredisis used